{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- Session_Connection_Time: double (nullable = true)\n",
      " |-- Bytes Transferred: double (nullable = true)\n",
      " |-- Kali_Trace_Used: integer (nullable = true)\n",
      " |-- Servers_Corrupted: double (nullable = true)\n",
      " |-- Pages_Corrupted: double (nullable = true)\n",
      " |-- Location: string (nullable = true)\n",
      " |-- WPM_Typing_Speed: double (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "\n",
    "spark = SparkSession.builder.appName('cluster').getOrCreate()\n",
    "df = spark.read.csv('hack_data.csv', inferSchema=True, header=True)\n",
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['Session_Connection_Time',\n",
       " 'Bytes Transferred',\n",
       " 'Kali_Trace_Used',\n",
       " 'Servers_Corrupted',\n",
       " 'Pages_Corrupted',\n",
       " 'Location',\n",
       " 'WPM_Typing_Speed']"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(Session_Connection_Time=8.0, Bytes Transferred=391.09, Kali_Trace_Used=1, Servers_Corrupted=2.96, Pages_Corrupted=7.0, Location='Slovenia', WPM_Typing_Speed=72.37)]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- Session_Connection_Time: double (nullable = true)\n",
      " |-- Bytes Transferred: double (nullable = true)\n",
      " |-- Kali_Trace_Used: integer (nullable = true)\n",
      " |-- Servers_Corrupted: double (nullable = true)\n",
      " |-- Pages_Corrupted: double (nullable = true)\n",
      " |-- Location: string (nullable = true)\n",
      " |-- WPM_Typing_Speed: double (nullable = true)\n",
      " |-- features: vector (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.feature import VectorAssembler\n",
    "\n",
    "assembler = VectorAssembler(inputCols = ['Session_Connection_Time',\n",
    " 'Bytes Transferred',\n",
    " 'Kali_Trace_Used',\n",
    " 'Servers_Corrupted',\n",
    " 'Pages_Corrupted',\n",
    " 'WPM_Typing_Speed'], outputCol = 'features')\n",
    "final_df = assembler.transform(df)\n",
    "final_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- Session_Connection_Time: double (nullable = true)\n",
      " |-- Bytes Transferred: double (nullable = true)\n",
      " |-- Kali_Trace_Used: integer (nullable = true)\n",
      " |-- Servers_Corrupted: double (nullable = true)\n",
      " |-- Pages_Corrupted: double (nullable = true)\n",
      " |-- Location: string (nullable = true)\n",
      " |-- WPM_Typing_Speed: double (nullable = true)\n",
      " |-- features: vector (nullable = true)\n",
      " |-- scaledFeatures: vector (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.feature import StandardScaler\n",
    "\n",
    "scaler = StandardScaler(inputCol = 'features', outputCol = 'scaledFeatures')\n",
    "scaler_model = scaler.fit(final_df)\n",
    "final_df = scaler_model.transform(final_df)\n",
    "final_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+\n",
      "|            features|      scaledFeatures|\n",
      "+--------------------+--------------------+\n",
      "|[8.0,391.09,1.0,2...|[0.56785108466505...|\n",
      "|[20.0,720.99,0.0,...|[1.41962771166263...|\n",
      "|[31.0,356.32,1.0,...|[2.20042295307707...|\n",
      "+--------------------+--------------------+\n",
      "only showing top 3 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "final_df.select('features','scaledFeatures').show(3)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(Session_Connection_Time=8.0, Bytes Transferred=391.09, Kali_Trace_Used=1, Servers_Corrupted=2.96, Pages_Corrupted=7.0, Location='Slovenia', WPM_Typing_Speed=72.37, features=DenseVector([8.0, 391.09, 1.0, 2.96, 7.0, 72.37]), scaledFeatures=DenseVector([0.5679, 1.3658, 1.9976, 1.2859, 2.2849, 5.3963]))]"
      ]
     },
     "execution_count": 13,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "final_df.take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.clustering import KMeans\n",
    "\n",
    "kmeans2 = KMeans(featuresCol = 'scaledFeatures', k=2)\n",
    "kmeans3 = KMeans(featuresCol = 'scaledFeatures', k=3)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "model_k2 = kmeans2.fit(final_df)\n",
    "model_k3 = kmeans3.fit(final_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "WSSSE_K2: 601.7707512676716\n",
      "WSSSE_K3: 434.1492898715845\n"
     ]
    }
   ],
   "source": [
    "print('WSSSE_K2:', model_k2.computeCost(final_df))\n",
    "print('WSSSE_K3:', model_k3.computeCost(final_df))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+-----+\n",
      "|prediction|count|\n",
      "+----------+-----+\n",
      "|         1|  167|\n",
      "|         0|  167|\n",
      "+----------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "model_k2.transform(final_df).groupBy('prediction').count().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+-----+\n",
      "|prediction|count|\n",
      "+----------+-----+\n",
      "|         1|  167|\n",
      "|         2|   84|\n",
      "|         0|   83|\n",
      "+----------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "model_k3.transform(final_df).groupBy('prediction').count().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
